// During concurrent close() calls we want to make sure that all of them return after the node has completed it's shutdown cycle. // If not, the hook that is added in Bootstrap#setup() will be useless: // close() might not be executed, in case another (for example api) call to close() has already set some lifecycles to stopped. // In this case the process will be terminated even if the first call to close() has not finished yet. @Override public synchronized void close() throws IOException { synchronized (lifecycle) { if (lifecycle.started()) { stop(); } if (!lifecycle.moveToClosed()) { return; } }
toClose.add(() -> stopWatch.stop().start("thread_pool")); toClose.add(() -> injector.getInstance(ThreadPool.class).shutdown()); // Don't call shutdownNow here, it might break ongoing operations on Lucene indices. // See https://issues.apache.org/jira/browse/LUCENE-7248. We call shutdownNow in // awaitClose if the node doesn't finish closing within the specified time.
if (logger.isTraceEnabled()) { toClose.add(() -> logger.trace("Close times for each service:\n{}", stopWatch.prettyPrint())); } IOUtils.close(toClose); logger.info("closed"); }
injector.getInstance(SnapshotsService.class).stop(); injector.getInstance(SnapshotShardsService.class).stop(); injector.getInstance(RepositoriesService.class).stop(); // stop any changes happening as a result of cluster state changes injector.getInstance(IndicesClusterStateService.class).stop(); // close discovery early to not react to pings anymore. // This can confuse other nodes and delay things - mostly if we're the master and we're running tests. injector.getInstance(Discovery.class).stop(); // we close indices first, so operations won't be allowed on it injector.getInstance(ClusterService.class).stop(); injector.getInstance(NodeConnectionsService.class).stop(); injector.getInstance(FsHealthService.class).stop(); nodeService.getMonitorService().stop(); injector.getInstance(GatewayService.class).stop(); injector.getInstance(SearchService.class).stop(); injector.getInstance(TransportService.class).stop();
pluginLifecycleComponents.forEach(LifecycleComponent::stop); // we should stop this last since it waits for resources to get released // if we had scroll searchers etc or recovery going on we wait for to finish. injector.getInstance(IndicesService.class).stop(); logger.info("stopped");
return this; }
各模块的关闭有一定的顺序关系,以 doStop 为例,按下表所示的 顺序调用各模块 doStop方法。
HTTP 传输服务,提供REST接口服务
Service responsible for maintaining and providing access to snapshot repositories on nodes
Runs periodically and attempts to create a temp file to see if the filesystem is writable. If not then it marks the path as unhealthy
// Copy indices because we modify it asynchronously in the body of the loop final Set<Index> indices = this.indices.values().stream().map(s -> s.index()).collect(Collectors.toSet()); final CountDownLatch latch = new CountDownLatch(indices.size()); for (final Index index : indices) { indicesStopExecutor.execute(() -> { try { removeIndex(index, IndexRemovalReason.SHUTDOWN, "shutdown"); } finally { latch.countDown(); } }); } try { // 注意shardsClosedTimeout 这个值是在IndicesService的构造函数中初始化的 // this.shardsClosedTimeout = settings.getAsTime(INDICES_SHARDS_CLOSED_TIMEOUT, new TimeValue(1, TimeUnit.DAYS)); // 也就是说 CountDownLatch.await默认1天才会继续后面的流程 if (latch.await(shardsClosedTimeout.seconds(), TimeUnit.SECONDS) == false) { logger.warn("Not all shards are closed yet, waited {}sec - stopping service", shardsClosedTimeout.seconds()); } } catch (InterruptedException e) { // ignore } finally { indicesStopExecutor.shutdown(); } }
1 2 3 4 5 6
IndicesService removeIndex(final Index index, final IndexRemovalReason reason, final String extraInfo)=> IndexService close(final String reason, boolean delete)=> IndexService removeShard(int shardId, String reason)=> IndexService removeShard(String reason, ShardId sId, IndexShard indexShard, Store store, IndexEventListener listener)=> IndexShard close(String reason, boolean flushEngine)=> Engine flushAndClose()=>
/** * Flush the engine (committing segments to disk and truncating the * translog) and close it. */ public void flushAndClose() throws IOException { if (isClosed.get() == false) { logger.trace("flushAndClose now acquire writeLock"); // 可以看一下: // https://github.com/jiankunking/elasticsearch/blob/master/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java#L857 // 由于写入操作已经加了读锁,此时写锁会等待,直到写入执行完毕。 // 因此数据写入过程不会被中断。但是由于网络模块被关闭,客户端的连接会被断开。 // 客户端应当作为失败处理,虽然ES服务端的写流程还在继续。 try (ReleasableLock lock = writeLock.acquire()) { logger.trace("flushAndClose now acquired writeLock"); try { logger.debug("flushing shard on close - this might take some time to sync files to disk"); try { // TODO we might force a flush in the future since we have the write lock already even though recoveries // are running. flush(); } catch (AlreadyClosedException ex) { logger.debug("engine already closed - skipping flushAndClose"); } } finally { close(); // double close is not a problem } } } awaitPendingClose(); }